-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Support indexed table reads in lowered execution #9522
[query] Support indexed table reads in lowered execution #9522
Conversation
a large diff, but not that complex. The changes to Emit/EmitStream are rote -- adding The design here (serializing the unstaged closures to generate decoders and such) is pretty gross, and I intend to fix this when we have infrastructure to mix interpreted TableIRs and lowered TableStages, and we can remove the interpreted indexed reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Just a couple nits.
val ctxMemo = ctxStruct.asBaseStruct.memoize(cb, "pnri_ctx_struct") | ||
cb.assign(idxr, getIndexReader(ctxMemo | ||
.loadField(cb, "indexPath") | ||
.handle(cb, cb._fatal("")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use IEmitCode.get
when you don't have a specific error message.
|
||
spec.rowsSpec.readTableStage(ctx, spec.rowsComponent.absolutePath(params.path), requestedType.rowType, partitioner, filterIntervals).apply(globals) | ||
// | ||
// val rowsPath = spec.rowsComponent.absolutePath(params.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete commented out old code?
addressed, thanks for the comments. |
CHANGELOG: Fix #13998 which appeared in 0.2.58 and prevented reading from a networked filesystem mounted within the filesystem of the worker node for certain pipelines (those that did not trigger "lowering"). We use the IndexReader in `PartitionNativeIntervalReader`, `PartitionNativeReaderIndexed`, and `PartitionZippedIndexedNativeReader`. 1. `PartitionNativeIntervalReader` is only used by `query_table`. 2. `PartitionNativeReaderIndexed` is only used by `IndexedRVDSpec2.readTableStage` which is used by `TableNativeReader` when there is a new partitioner. 3. `PartitionZippedIndexedNativeReader` is only sued by `AbstractRVDSpec.readZippedLowered` when there is a new partitioner. Two is for tables, three is for matrix tables. In `readZippedLowered` we explicitly [drop the file protocol](https://github.com/hail-is/hail/blob/1dedf3c63f9aabf1b6ce538165360056f82f76e4/hail/src/main/scala/is/hail/rvd/AbstractRVDSpec.scala#L154-L155): ``` val absPathLeft = removeFileProtocol(pathLeft) val absPathRight = removeFileProtocol(pathRight) ``` We have done this, by various names, since this lowered code path was added. I added `removeFileProtocol` because stripping the protocol in Query-on-Batch prevented the reading and writing of gs:// URIs, the only URIs I could read in QoB. `uriPath` (the function whose use I replaced with `removeFileProtocol`) was added by Cotton [a very long time ago](92a9936). It seems he added it so that he could use HDFS to generate a temporary file path on the local filesystem but pass the file path to binary tools that know nothing of HDFS and file:// URIs. #9522 added the lowered code path and thus introduced this bug. It attempted to mirror the extant code in [`readIndexedPartitions`](https://github.com/hail-is/hail/blob/2b0aded9206849252b453dd80710cea8d2156793/hail/src/main/scala/is/hail/HailContext.scala#L421-L440) which *does not* strip any protocols from the path. This has gone undetected because we never try to read data through the OS's filesystem. We always use gs://, Azure, or s3:// because we do not test in environments that have a networked file system mounted in the OS's filesystem. To replicate this bug (and add a test for it), we would need a cluster with a lustre file system (or another networked filesystem). This would be a fairly large lift. The fix is trivial: just never intentionally strip the protocol!
CHANGELOG: Fixes a serious, but likely rare, bug in the Table/MatrixTable reader, which has been present since Sep 2020. It manifests as many (around half or more) of the rows being dropped. This could only happen when 1) reading a (matrix)table whose partitioning metadata allows rows with the same key to be split across neighboring partitions, and 2) reading it with a different partitioning than it was written. 1) would likely only happen by reading data keyed by locus and alleles, and rekeying it to only locus before writing. 2) would likely only happen by using the `_intervals` or `_n_partitions` arguments to `read_(matrix)_table`, or possibly `repartition`. Please reach out to us if you're concerned you may have been affected by this. This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which appears to have been around since this code was first added in #9522 almost four years ago. It was reported in this [zulip thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning). I want to do further work to better characterize exactly what it takes to be affected by this bug, but I think you must have a table or matrixtable on disk which has duplicate keys, and moreover keys which span neighboring partitions, and then you must read the data with a different partitioner. The root of the issue is an invalid assumption made in the code. To read data written with partitioner `p1` using new partitioner `p2`, it first computes the "intersection", or common refinement, of the two. It then assumes that each partition in the refinement overlaps exactly one partition of `p1`. But this is only true if the partitions of `p1` are themselves mutually disjoint, which is usually but not necessarily true. For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner, and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input partitions are not disjoint, as the key `5` is allowed in both. The common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each partition in the refinement, we want to read in the corresponding range from the appropriate input partition, then we want to group the partitions in the refinement to match the new partitioner. The code finds "the appropriate input partition" by taking the first input partition which overlaps the refinement partition, using `lowerBoundInterval`. That works if there is only one overlapping input partition, but here fails, since the refinement partition `[5, 8]` overlaps both input partitions. So the code mistakenly reads from the input partition `[1, 5]` to produce the refinement partition `[5, 8]`, and so completely drops all rows in the input `[5, 8]`. In practice, I think the most likely way to run into this (and the way it was found by a user) is to have a dataset keyed by `["locus", "alleles"]`, which has split multi-allelics, so there are multiple rows with the same locus. Then shorten the key to `["locus"]`, write the dataset to disk, and read it back with a different partitioning, e.g. by passing a `_n_partitions` argument to `read_table` or `read_matrix_table`. For instance, if the partitioning was originally `[ [{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`, then after shortening the key it would be `[ [1:1, 1:500], [1:500, 1:1000] ]`. Notice that even though the original partitioning had no overlap, it does after shortening the key, because rows with locus `1:500` with alleles less than `["G"]` are allowed in the first partition, so we have to make the right endpoint inclusive after shortening. You would then need to write this rekeyed dataset to disk and read it back with different partitioning (note that `ds.repartition` is enough to do this in the batch backend). I still need to think through what holes in our testing allowed this to remain undetected for so long, and attempt to plug them. We should also plan for what to tell a user who is concerned they may have been affected by this in the past.
CHANGELOG: Fixes a serious, but likely rare, bug in the Table/MatrixTable reader, which has been present since Sep 2020. It manifests as many (around half or more) of the rows being dropped. This could only happen when 1) reading a (matrix)table whose partitioning metadata allows rows with the same key to be split across neighboring partitions, and 2) reading it with a different partitioning than it was written. 1) would likely only happen by reading data keyed by locus and alleles, and rekeying it to only locus before writing. 2) would likely only happen by using the `_intervals` or `_n_partitions` arguments to `read_(matrix)_table`, or possibly `repartition`. Please reach out to us if you're concerned you may have been affected by this. This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which appears to have been around since this code was first added in hail-is#9522 almost four years ago. It was reported in this [zulip thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning). I want to do further work to better characterize exactly what it takes to be affected by this bug, but I think you must have a table or matrixtable on disk which has duplicate keys, and moreover keys which span neighboring partitions, and then you must read the data with a different partitioner. The root of the issue is an invalid assumption made in the code. To read data written with partitioner `p1` using new partitioner `p2`, it first computes the "intersection", or common refinement, of the two. It then assumes that each partition in the refinement overlaps exactly one partition of `p1`. But this is only true if the partitions of `p1` are themselves mutually disjoint, which is usually but not necessarily true. For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner, and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input partitions are not disjoint, as the key `5` is allowed in both. The common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each partition in the refinement, we want to read in the corresponding range from the appropriate input partition, then we want to group the partitions in the refinement to match the new partitioner. The code finds "the appropriate input partition" by taking the first input partition which overlaps the refinement partition, using `lowerBoundInterval`. That works if there is only one overlapping input partition, but here fails, since the refinement partition `[5, 8]` overlaps both input partitions. So the code mistakenly reads from the input partition `[1, 5]` to produce the refinement partition `[5, 8]`, and so completely drops all rows in the input `[5, 8]`. In practice, I think the most likely way to run into this (and the way it was found by a user) is to have a dataset keyed by `["locus", "alleles"]`, which has split multi-allelics, so there are multiple rows with the same locus. Then shorten the key to `["locus"]`, write the dataset to disk, and read it back with a different partitioning, e.g. by passing a `_n_partitions` argument to `read_table` or `read_matrix_table`. For instance, if the partitioning was originally `[ [{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`, then after shortening the key it would be `[ [1:1, 1:500], [1:500, 1:1000] ]`. Notice that even though the original partitioning had no overlap, it does after shortening the key, because rows with locus `1:500` with alleles less than `["G"]` are allowed in the first partition, so we have to make the right endpoint inclusive after shortening. You would then need to write this rekeyed dataset to disk and read it back with different partitioning (note that `ds.repartition` is enough to do this in the batch backend). I still need to think through what holes in our testing allowed this to remain undetected for so long, and attempt to plug them. We should also plan for what to tell a user who is concerned they may have been affected by this in the past.
CHANGELOG: Fixes a serious, but likely rare, bug in the Table/MatrixTable reader, which has been present since Sep 2020. It manifests as many (around half or more) of the rows being dropped. This could only happen when 1) reading a (matrix)table whose partitioning metadata allows rows with the same key to be split across neighboring partitions, and 2) reading it with a different partitioning than it was written. 1) would likely only happen by reading data keyed by locus and alleles, and rekeying it to only locus before writing. 2) would likely only happen by using the `_intervals` or `_n_partitions` arguments to `read_(matrix)_table`, or possibly `repartition`. Please reach out to us if you're concerned you may have been affected by this. This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which appears to have been around since this code was first added in #9522 almost four years ago. It was reported in this [zulip thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning). I want to do further work to better characterize exactly what it takes to be affected by this bug, but I think you must have a table or matrixtable on disk which has duplicate keys, and moreover keys which span neighboring partitions, and then you must read the data with a different partitioner. The root of the issue is an invalid assumption made in the code. To read data written with partitioner `p1` using new partitioner `p2`, it first computes the "intersection", or common refinement, of the two. It then assumes that each partition in the refinement overlaps exactly one partition of `p1`. But this is only true if the partitions of `p1` are themselves mutually disjoint, which is usually but not necessarily true. For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner, and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input partitions are not disjoint, as the key `5` is allowed in both. The common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each partition in the refinement, we want to read in the corresponding range from the appropriate input partition, then we want to group the partitions in the refinement to match the new partitioner. The code finds "the appropriate input partition" by taking the first input partition which overlaps the refinement partition, using `lowerBoundInterval`. That works if there is only one overlapping input partition, but here fails, since the refinement partition `[5, 8]` overlaps both input partitions. So the code mistakenly reads from the input partition `[1, 5]` to produce the refinement partition `[5, 8]`, and so completely drops all rows in the input `[5, 8]`. In practice, I think the most likely way to run into this (and the way it was found by a user) is to have a dataset keyed by `["locus", "alleles"]`, which has split multi-allelics, so there are multiple rows with the same locus. Then shorten the key to `["locus"]`, write the dataset to disk, and read it back with a different partitioning, e.g. by passing a `_n_partitions` argument to `read_table` or `read_matrix_table`. For instance, if the partitioning was originally `[ [{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`, then after shortening the key it would be `[ [1:1, 1:500], [1:500, 1:1000] ]`. Notice that even though the original partitioning had no overlap, it does after shortening the key, because rows with locus `1:500` with alleles less than `["G"]` are allowed in the first partition, so we have to make the right endpoint inclusive after shortening. You would then need to write this rekeyed dataset to disk and read it back with different partitioning (note that `ds.repartition` is enough to do this in the batch backend). I still need to think through what holes in our testing allowed this to remain undetected for so long, and attempt to plug them. We should also plan for what to tell a user who is concerned they may have been affected by this in the past.
No description provided.